Skip to content

[Data] speedup checkpoint filter 5x#60002

Closed
wxwmd wants to merge 1 commit intoray-project:masterfrom
wxwmd:speedup_ckpt_filter
Closed

[Data] speedup checkpoint filter 5x#60002
wxwmd wants to merge 1 commit intoray-project:masterfrom
wxwmd:speedup_ckpt_filter

Conversation

@wxwmd
Copy link
Contributor

@wxwmd wxwmd commented Jan 9, 2026

Modification

I'm using Ray Data's checkpoint. My data has 115 million records, with primary key {"id": str}. When I use Checkpoint to filter the input blocks, it takes several hours.

I checked the performance bottleneck and found it occurs in the filter_with_ckpt_chunk function in checkpoint_filter.py. I add some logs:

# Get all chunks of the checkpointed ID column.
ckpt_chunks = checkpointed_ids[self.id_column].chunks
# Convert the block's ID column to a numpy array for fast processing.
block_ids = block[self.id_column].to_numpy()

def filter_with_ckpt_chunk(ckpt_chunk: pyarrow.ChunkedArray) -> numpy.ndarray:
    t1 = time.time()
    ckpt_ids = transform_pyarrow.to_numpy(ckpt_chunk, zero_copy_only=False)
    print(f"ckpt_ids to numpy cost time {time.time()-t1}s")
   
    ...
    t2 = time.time()
    sorted_indices = numpy.searchsorted(ckpt_ids, block_ids)
    print(f"searchsorted costs {time.time()-t2}s")

the ckpt_chunk has shape (115022113), and block_ids has shape (14534). I got:

ckpt_ids to numpy cost time: 6.057122468948364s
searchsorted costs 0.11587834358215332s

We can see from the perf test that:

  1. ckpt_chunks has only one chunk because we has combined chunks _combine_chunks
  2. the ckpt_chunk is a very large chunk that holds 115 millon ids, convert it from pyarrow to numpy will costs 6s
  3. For every input block, ckpt_ids = transform_pyarrow.to_numpy(ckpt_chunk, zero_copy_only=False) is executed once, causing a large time overhead.

This PR obtains the ckpt_id numpy array in advance, avoiding multiple calls. In my tests, this can reduce the filtering time from 5 hours to 40 minutes.

Notes:

In this PR, each read task needs to read the ckpt_ids(numpy.ndarray) from the object store, rather than Arrow format. This increases I/O and memory overhead because Arrow arrays usually costs less space. In my experiment, the pyarrow array(115 million rows, string-typed) used 1.7 GB of memory, while the numpy array used 9 GB. However, I this this memory overhead is acceptable because of the performance improvement.

@wxwmd wxwmd requested a review from a team as a code owner January 9, 2026 09:15
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant performance optimization for checkpoint filtering by converting checkpointed IDs to a NumPy array once, rather than for every block. The changes are well-implemented and consistent across the modified files. My review includes a couple of suggestions to enhance code clarity and maintainability.

Comment on lines 689 to 695
combined_ckpt_block = transform_pyarrow.combine_chunks(pyarrow_checkpointed_ids)

combine_ckpt_chunks = combined_ckpt_block[ID_COL].chunks
assert len(combine_ckpt_chunks) == 1
# Convert checkpoint chunk to numpy for fast search.
# Use internal helper function for consistency and robustness (handles null-typed arrays, etc.)
ckpt_ids = transform_pyarrow.to_numpy(combine_ckpt_chunks[0], zero_copy_only=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This logic for converting a pyarrow Table to a numpy array of IDs is duplicated from _combine_chunks in checkpoint_filter.py. To improve maintainability, consider extracting this logic into a non-remote helper function in checkpoint_filter.py and calling it from both _combine_chunks and this test. This would avoid having to update the logic in two places if it ever changes.

@wxwmd wxwmd changed the title [Data] speedup ckpt filter 5x [Data] speedup checkpoint filter 5x Jan 9, 2026
@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Jan 9, 2026
@owenowenisme owenowenisme self-assigned this Jan 10, 2026
@wingkitlee0
Copy link
Contributor

This is nice. Some optimizations can be considered for future PRs:

  • it may be worth sorting the block_ids when performing searchsorted(checkpoint_ids, block_ids). There are some numpy internal optimization. We may want to use the original order for output tho.
  • The industry-standard sortedcontainers library uses a list of list (i.e., chunking). We may be able to do something similar: chunking the long array into multiple shorter ones (<1M elements), so that they all fit in cache (individually).
  • related to the second point, partitioning may help to avoid repartition(1) when loading the checkpoint (I haven't read thru how the checkpoint is constructed yet, but repartition(1) seems heavy if the pipeline almost finishes..)

filtering time from 5 hours to 40 minutes.

Just to understand better, this is the total time spent in the filter function, right?

@wxwmd
Copy link
Contributor Author

wxwmd commented Jan 12, 2026

This is nice. Some optimizations can be considered for future PRs:

  • it may be worth sorting the block_ids when performing searchsorted(checkpoint_ids, block_ids). There are some numpy internal optimization. We may want to use the original order for output tho.
  • The industry-standard sortedcontainers library uses a list of list (i.e., chunking). We may be able to do something similar: chunking the long array into multiple shorter ones (<1M elements), so that they all fit in cache (individually).
  • related to the second point, partitioning may help to avoid repartition(1) when loading the checkpoint (I haven't read thru how the checkpoint is constructed yet, but repartition(1) seems heavy if the pipeline almost finishes..)

filtering time from 5 hours to 40 minutes.

Just to understand better, this is the total time spent in the filter function, right?

@wingkitlee0 yes, this is the total time spent in the filter function. This PR addresses the time overhead caused by repeated copies from pyarrow->numpy. After this, I believe the points you mentioned can further improve performance. I'm interested in implementing them.

@wxwmd wxwmd force-pushed the speedup_ckpt_filter branch from c6d23db to 0fb1a5d Compare January 12, 2026 07:17
@wxwmd wxwmd force-pushed the speedup_ckpt_filter branch from 8282751 to 8be95cf Compare January 12, 2026 12:14
@wxwmd
Copy link
Contributor Author

wxwmd commented Jan 13, 2026

seems kind of messy, i will split this into 3 pr

@wxwmd wxwmd force-pushed the speedup_ckpt_filter branch 2 times, most recently from c4eab5f to 0e213dc Compare January 13, 2026 11:05
@wxwmd wxwmd marked this pull request as draft January 14, 2026 07:35
Signed-off-by: xiaowen.wxw <wxw403883@alibaba-inc.com>

keep this pr simple
Signed-off-by: xiaowen.wxw <wxw403883@alibaba-inc.com>
@wxwmd
Copy link
Contributor Author

wxwmd commented Jan 19, 2026

moved to #60294

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants